16.4 Skills 权限控制

17 分钟阅读

权限控制概述#

Skills 的权限控制是确保系统安全和数据保护的重要机制。本节将详细介绍 Skills 的权限模型、权限类型和权限管理方法。

权限模型#

1. 基于角色的访问控制(RBAC)#

1.1 角色定义

角色类型#

管理员角色#

  • 完全访问权限
  • 可以创建、修改、删除 Skills
  • 可以管理用户和角色

开发者角色#

  • 可以创建和修改 Skills
  • 可以执行 Skills
  • 不能删除其他用户的 Skills

用户角色#

  • 只能执行 Skills
  • 不能创建或修改 Skills
  • 访问受限的资源和功能

访客角色#

  • 只读权限
  • 只能查看 Skill 文档
  • 不能执行任何操作

1.2 权限定义

python
class Permission: def __init__(self, name, description): self.name = name self.description = description # Skill 权限 SKILL_CREATE = Permission("skill:create", "创建 Skill") SKILL_READ = Permission("skill:read", "读取 Skill") SKILL_UPDATE = Permission("skill:update", "更新 Skill") SKILL_DELETE = Permission("skill:delete", "删除 Skill") SKILL_EXECUTE = Permission("skill:execute", "执行 Skill") # 资源权限 RESOURCE_READ = Permission("resource:read", "读取资源") RESOURCE_WRITE = Permission("resource:write", "写入资源") RESOURCE_DELETE = Permission("resource:delete", "删除资源") # 系统权限 SYSTEM_CONFIG = Permission("system:config", "配置系统") SYSTEM_MONITOR = Permission("system:monitor", "监控系统") #### 1.3 角色权限映射 ```python class Role: def __init__(self, name, permissions): self.name = name self.permissions = permissions # 定义角色 ADMIN = Role("admin", [ SKILL_CREATE, SKILL_READ, SKILL_UPDATE, SKILL_DELETE, SKILL_EXECUTE, RESOURCE_READ, RESOURCE_WRITE, RESOURCE_DELETE, SYSTEM_CONFIG, SYSTEM_MONITOR ]) DEVELOPER = Role("developer", [ SKILL_CREATE, SKILL_READ, SKILL_UPDATE, SKILL_EXECUTE, RESOURCE_READ, RESOURCE_WRITE ]) USER = Role("user", [ SKILL_READ, SKILL_EXECUTE, RESOURCE_READ ]) GUEST = Role("guest", [ SKILL_READ ]) ~~~ ### 2. 基于属性的访问控制(ABAC) #### 2.1 属性定义 ```python class Attribute: def __init__(self, name, value): self.name = name self.value = value # 用户属性 class UserAttributes: def __init__(self, user_id, role, department, level): self.user_id = Attribute("user_id", user_id) self.role = Attribute("role", role) self.department = Attribute("department", department) self.level = Attribute("level", level) # 资源属性 class ResourceAttributes: def __init__(self, resource_id, owner, sensitivity, classification): self.resource_id = Attribute("resource_id", resource_id) self.owner = Attribute("owner", owner) self.sensitivity = Attribute("sensitivity", sensitivity) self.classification = Attribute("classification", classification) # 环境属性 class EnvironmentAttributes: def __init__(self, time, location, device): self.time = Attribute("time", time) self.location = Attribute("location", location) self.device = Attribute("device", device) #### 2.2 策略定义 ```python class AccessPolicy: def __init__(self, name, condition, effect): self.name = name self.condition = condition self.effect = effect # 示例策略 POLICIES = [ AccessPolicy( name="admin_full_access", condition=lambda user, resource, env: user.role.value == "admin", effect="allow" ), AccessPolicy( name="owner_access", condition=lambda user, resource, env: user.user_id.value == resource.owner.value, effect="allow" ), AccessPolicy( name="sensitivity_restriction", condition=lambda user, resource, env: resource.sensitivity.value == "high" and user.level.value < 3, effect="deny" ), AccessPolicy( name="business_hours_only", condition=lambda user, resource, env: not (9 <= env.time.value.hour < 17), effect="deny" ) ] ~~~ ### 3. 权限检查 #### 3.1 RBAC 检查 ```python class RBACChecker: def __init__(self): self.user_roles = {} self.role_permissions = {} def assign_role(self, user_id, role): if user_id not in self.user_roles: self.user_roles[user_id] = [] self.user_roles[user_id].append(role) def check_permission(self, user_id, permission): # 获取用户角色 roles = self.user_roles.get(user_id, []) # 检查每个角色是否有该权限 for role in roles: if permission in role.permissions: return True return False def check_skill_permission(self, user_id, skill_id, action): permission = Permission(f"skill:{action}", "") return self.check_permission(user_id, permission) #### 3.2 ABAC 检查 class ABACChecker: def __init__(self): self.policies = [] def add_policy(self, policy): self.policies.append(policy) def check_access(self, user, resource, environment): for policy in self.policies: if policy.condition(user, resource, environment): return policy.effect == "allow" return False def check_skill_access(self, user, skill, environment): return self.check_access(user, skill, environment) ~~~ ## 资源访问控制 ### 1. 文件系统访问 #### 1.1 文件权限 ```python class FilePermission: def __init__(self, path, read, write, execute): self.path = path self.read = read self.write = write self.execute = execute class FileAccessController: def __init__(self): self.permissions = {} def set_permission(self, path, read=False, write=False, execute=False): self.permissions[path] = FilePermission(path, read, write, execute) def check_read(self, user_id, path): permission = self.permissions.get(path) if not permission: return False # 检查用户是否有读权限 if permission.read: return True # 检查用户是否是文件所有者 if self.is_owner(user_id, path): return True return False def check_write(self, user_id, path): permission = self.permissions.get(path) if not permission: return False if permission.write: return True if self.is_owner(user_id, path): return True return False def is_owner(self, user_id, path): # 检查用户是否是文件所有者 pass

1.2 目录访问

python
class DirectoryAccessController: def __init__(self): self.access_rules = {} def set_access_rule(self, directory, allowed_users, allowed_roles): self.access_rules[directory] = { "users": allowed_users, "roles": allowed_roles } def check_access(self, user_id, user_roles, directory): rule = self.access_rules.get(directory) if not rule: return False if user_id in rule["users"]: return True for role in user_roles: if role in rule["roles"]: return True
bash
    return False
bash
### 2. API 访问控制
#### 2.1 API 权限
```python
class APIPermission:
    def __init__(self, endpoint, method, required_permissions):
        self.endpoint = endpoint
        self.method = method
        self.required_permissions = required_permissions

class APIAccessController:
    def __init__(self):
        self.api_permissions = {}

    def register_endpoint(self, endpoint, method, required_permissions):
        key = f"{method}:{endpoint}"
        self.api_permissions[key] = APIPermission(
            endpoint, method, required_permissions
        )

    def check_access(self, user_id, endpoint, method):
        key = f"{method}:{endpoint}"
        permission = self.api_permissions.get(key)

        if not permission:
            return False

        # 检查用户是否有所有必需的权限
        for required_perm in permission.required_permissions:
            if not self.has_permission(user_id, required_perm):
                return False

        return True

    def has_permission(self, user_id, permission):
        # 检查用户是否有特定权限
        pass

2.2 速率限制

python
class RateLimiter: def __init__(self): self.requests = defaultdict(list) def check_rate_limit(self, user_id, endpoint, limit, window): now = datetime.now() window_start = now - timedelta(seconds=window) user_requests = self.requests[user_id] recent_requests = [ req for req in user_requests if req["timestamp"] >= window_start and req["endpoint"] == endpoint ] if len(recent_requests) >= limit: return False user_requests.append({ "endpoint": endpoint, "timestamp": now }) return True

3. 数据访问控制#

3.1 数据库访问

python
class DatabaseAccessController: def __init__(self): self.table_permissions = {} def set_table_permission(self, table, permissions): self.table_permissions[table] = permissions def check_select(self, user_id, table): permission = self.table_permissions.get(table) if not permission: return False return permission.get("select", False) def check_insert(self, user_id, table): permission = self.table_permissions.get(table) if not permission: return False return permission.get("insert", False) def check_update(self, user_id, table): permission = self.table_permissions.get(table) if not permission: return False return permission.get("update", False) def check_delete(self, user_id, table): permission = self.table_permissions.get(table) if not permission: return False return permission.get("delete", False) #### 3.2 字段级访问 class FieldAccessController: def __init__(self): self.field_permissions = {} def set_field_permission(self, table, field, allowed_roles): key = f"{table}.{field}" self.field_permissions[key] = allowed_roles def check_field_access(self, user_roles, table, field): key = f"{table}.{field}" allowed_roles = self.field_permissions.get(key, []) for role in user_roles: if role in allowed_roles: return True return False def filter_fields(self, user_roles, table, fields): accessible_fields = [] for field in fields: if self.check_field_access(user_roles, table, field): accessible_fields.append(field) return accessible_fields

Skill 权限管理#

1. Skill 创建权限#

bash
python

class SkillCreationManager:
    def __init__(self, permission_checker):
        self.permission_checker = permission_checker

    def can_create_skill(self, user_id, skill_definition):
        # 检查用户是否有创建 Skill 的权限
        if not self.permission_checker.check_permission(user_id, SKILL_CREATE):
            return False

        # 检查 Skill 定义是否符合规范
        if not self.validate_skill_definition(skill_definition):
            return False

        # 检查资源访问权限
        if not self.check_resource_access(user_id, skill_definition):
            return False

        return True

    def validate_skill_definition(self, definition):
        # 验证 Skill 定义
        required_fields = ["name", "version", "description"]
        for field in required_fields:
            if field not in definition:
                return False
        return True

    def check_resource_access(self, user_id, definition):
        # 检查 Skill 需要访问的资源
        resources = definition.get("resources", [])
        for resource in resources:
            if not self.permission_checker.check_resource_access(user_id, resource):
                return False
        return True
```

### 2. Skill 执行权限
```python
class SkillExecutionManager:
    def __init__(self, permission_checker):
        self.permission_checker = permission_checker

    def can_execute_skill(self, user_id, skill_id, parameters):
        if not self.permission_checker.check_permission(user_id, SKILL_EXECUTE):
            return False
        skill = self.get_skill(skill_id)
        if not skill:
            return False
        if not self.check_skill_specific_permission(user_id, skill):
            return False
        if not self.check_parameter_permissions(user_id, skill, parameters):
            return False
        return True

    def check_skill_specific_permission(self, user_id, skill):
        required_permissions = skill.get_required_permissions()
        for permission in required_permissions:
            if not self.permission_checker.check_permission(user_id, permission):
                return False
        return True

    def check_parameter_permissions(self, user_id, skill, parameters):
        for key, value in parameters.items():
            if self.is_resource_reference(value):
                if not self.permission_checker.check_resource_access(user_id, value):
                    return False
        return True

    def is_resource_reference(self, value):
        return isinstance(value, str) and value.startswith("resource://")

3. Skill 修改权限#

bash
python

class SkillModificationManager:
    def __init__(self, permission_checker):
        self.permission_checker = permission_checker

    def can_modify_skill(self, user_id, skill_id):
        # 检查用户是否有修改 Skill 的权限
        if not self.permission_checker.check_permission(user_id, SKILL_UPDATE):
            return False

        # 检查 Skill 是否存在
        skill = self.get_skill(skill_id)
        if not skill:
            return False

        # 检查用户是否是 Skill 所有者
        if not self.is_skill_owner(user_id, skill):
            return False

        return True

    def is_skill_owner(self, user_id, skill):
        return skill.owner_id == user_id
```

### 4. Skill 删除权限
```python
class SkillDeletionManager:
    def __init__(self, permission_checker):
        self.permission_checker = permission_checker

    def can_delete_skill(self, user_id, skill_id):
        if not self.permission_checker.check_permission(user_id, SKILL_DELETE):
            return False
        skill = self.get_skill(skill_id)
        if not skill:
            return False
        if not (self.is_skill_owner(user_id, skill) or
                self.permission_checker.is_admin(user_id)):
            return False
        if self.has_dependents(skill_id):
            return False
        return True

    def is_skill_owner(self, user_id, skill):
        return skill.owner_id == user_id

    def has_dependents(self, skill_id):
        pass
```

## 审计日志
### 1. 访问日志
```python
class AccessLogger:
    def __init__(self):
        self.logs = []

    def log_access(self, user_id, resource, action, result):
        log_entry = {
            "timestamp": datetime.now(),
            "user_id": user_id,
            "resource": resource,
            "action": action,
            "result": result
        }
        self.logs.append(log_entry)

    def get_access_logs(self, user_id=None, resource=None, action=None):
        filtered_logs = self.logs

        if user_id:
            filtered_logs = [log for log in filtered_logs if log["user_id"] == user_id]

        if resource:
            filtered_logs = [log for log in filtered_logs if log["resource"] == resource]

        if action:
            filtered_logs = [log for log in filtered_logs if log["action"] == action]

        return filtered_logs
```

### 2. 权限变更日志
```python
class PermissionChangeLogger:
    def __init__(self):
        self.changes = []

    def log_permission_change(self, user_id, target, old_permissions, new_permissions, changed_by):
        change_entry = {
            "timestamp": datetime.now(),
            "user_id": user_id,
            "target": target,
            "old_permissions": old_permissions,
            "new_permissions": new_permissions,
            "changed_by": changed_by
        }
        self.changes.append(change_entry)

    def get_permission_changes(self, user_id=None, target=None):
        filtered_changes = self.changes
        if user_id:
            filtered_changes = [change for change in filtered_changes if change["user_id"] == user_id]
        if target:
            filtered_changes = [change for change in filtered_changes if change["target"] == target]
        return filtered_changes
```

标记本节教程为已读

记录您的学习进度,方便后续查看。